-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-27190][SQL] add table capability for streaming #24129
Conversation
@cloud-fan, I think this should update |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly looks good.
I'm not familiar with V2WriteSupportCheck, but if it's a reasonably small change I agree that would make sense to update.
@@ -353,13 +354,15 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister | |||
} | |||
|
|||
class KafkaTable(strategy: => ConsumerStrategy) extends Table | |||
with SupportsMicroBatchRead with SupportsContinuousRead with SupportsStreamingWrite { | |||
with SupportsRead with SupportsWrite { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't necessarily think we should go back to the design, but it is a bit weird in context to see that SupportsRead and SupportsWrite end up being super-capabilities configured differently than normal capabilities.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I raised the same concern before. @rdblue shall we reconsider it? I don't think we need the flexibility to change the read/write API(it will be a breaking change anyway). It's more important to make the API consistent with itself, and only has a single capability API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think of these as super-capabilities, I think of them as a link between the v2 catalog API and the v2 source API. Using a trait like SupportsRead
means we can later make changes to either one and continue to use the other. For example, if we added a Stream
abstraction instead of using Table
for both, then we would be able to have Stream
implement SupportsRead
. Does that make sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My understanding is, the Table
interface is the link between v2 source API and v2 catalog API. If we want to change the abstraction later, I think that will be very painful, moving out the read/write methods to separated interfaces doesn't help much.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @jose-torres. The current mix is very confusing. Difficult to know what to use.
val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId" | ||
val source = v1.get.dataSource.createSource(metadataPath) | ||
nextSourceId += 1 | ||
logInfo(s"Using Source [$source] from DataSourceV2 named '$srcName' [$src]") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know this wasn't introduced by the current PR, but this shouldn't say DataSourceV2.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we mention something about v1 and v2? Some sources have both v1 and v2 implementations and it might be helpful to have a log saying which implementation is actually used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be valuable to include whether v1 or v2 is used when the source supports both.
Test build #103608 has finished for PR 24129 at commit
|
@rdblue The check for the streaming side is done in I don't think it's an easy change to move the check to
If we do want to move the check, we should do it in another PR, which would introduce a non-trivial refactor. |
retest this please |
Test build #4636 has started for PR 24129 at commit |
Test build #103677 has started for PR 24129 at commit |
* An internal base interface of mix-in interfaces for writable {@link Table}. This adds | ||
* {@link #newWriteBuilder(CaseInsensitiveStringMap)} that is used to create a write | ||
* for batch or streaming. | ||
* A mix-in interface of {@link Table}, to indicate that it's readable. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jose-torres, @cloud-fan: Maybe the capability/trait distinction would be more clear if this stated "... to indicate that it's readable using the v2 datasource API".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't have a plan to introduce something similar to the Table
API, I'd like to be clear right now and say this is a mix-trait for Table
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's fine with me. Can we make this change part of a separate documentation update?
*/ | ||
@Evolving | ||
public interface Table { | ||
public interface Table extends BaseStreamingSink { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did this add BaseStreamingSink
? It doesn't make sense to me that we would require all tables to be streaming sinks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BaseStreamingSink
is an empty internal interface, which is a common interface for v1 and v2 streaming sources. We will remove it after ds v2 is finalized and v1 streaming source can be removed.
If we don't want to pollute the Table
interface, we can create a
public interface StreamingTable extends Table, BaseStreamingSink
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan, your reply is not a justification for adding this. Please answer: Why is this change required for this commit to function correctly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's required because it's the common interface for streaming in V1 and V2. A streaming sink is represented by a Sink in V1 or a Table in V2; BaseStreamingSink is used in places where it doesn't matter which kind of sink it is. If this did not extend BaseStreamingSink the code wouldn't compile.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because Table
is used to identify a source in MicroBatchExecution
, to track the progress of all the sources in a micro-batch query.
This is only a problem in MicroBatchExecution
, because it needs to support micro-bath source v1 (there is no continuous source v1) and ds v2 together. The way we did it is to create a common interface as the source representation for both micro-bath source v1 and ds v2, so that we can unify the code to track progress of sources, see https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why can't implementations extend BaseStreamingSink
and leave this interface alone? Not all tables are streaming sinks, so this inheritance is incorrect.
import org.apache.spark.sql.sources.v2.Table; | ||
import org.apache.spark.sql.sources.v2.TableCapability; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: this commit includes non-functional changes. These docs changes are good to have, but they are not related to streaming and there is no need for this file to change in this commit. I would prefer to revert this and include it in a docs-only PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same with the changes in WriteBuilder
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's related to streaming: https://github.com/apache/spark/pull/24129/files#diff-d111d7e2179b55465840c9a81ea004f2R76
The only unnecessary change is the doc for batch. I replace BATCH_READ
with a java doc link, to be consistent with streaming.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Javadoc changes are non-functional changes. If you want to correct Javadoc, limit updates to files that are already changing in the PR.
As it is, this is a non-functional change in an unrelated file. The commit message is going to indicate this commit is related to streaming, so this could easily cause conflicts. Please remove it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't agree with this. I add doc for streaming and change the doc for batch a little bit to match the streaming doc style. It's very annoying to separate minor change like this. I do consider this change as self-contained.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there are too many documentation-only changes for this PR to be committed. Please separate them into a different PR. A docs-only commit will be easy to review and easy to cherry-pick based on the description.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did I miss something here? The docs changes are totally related to the current PR (the doc would refer to deleted interface).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The docs changes that I was referring to were moved already, this was just a poorly placed comment.
@cloud-fan, are you saying that validation cannot be done by the analyzer? |
@rdblue I don't have a good idea about how to do the streaming check in the analyzer. Currently we just transform the plan and do the table capability check in |
@cloud-fan, could you be more clear and include the details? Why is this being done in the physical plan nodes in the first place? |
How do you come to this conclusion? One way I can think of is, let |
FYI this is the check for micro-batch. it's on logical plan. |
Sorry for the confusion. From the names, like |
@cloud-fan, I don't think this commit is ready. I've left a few comments and responses. Please consider this a -1 until those are addressed. |
@rdblue I've removed the It will be great to continue the discussion of #24129 (comment) , although it's not quite related to this PR. |
Test build #103834 has finished for PR 24129 at commit
|
retest this please |
Test build #103847 has finished for PR 24129 at commit
|
@rdblue any more comments? I'd like to merge it soon and work on the schema check capability. |
retest this please |
Test build #104225 has finished for PR 24129 at commit
|
* A mix-in interface of {@link Table}, to indicate that it's readable. | ||
* <p> | ||
* This defines {@link #newScanBuilder(CaseInsensitiveStringMap)} that is used to create a scan | ||
* builder for batch, micro-batch, or continuous processing. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this documentation change is necessary. Looks like it just reformats and rephrases a little.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason is, it's not an internal interface anymore. With table capability API, users need to implement this interface explicitly.
retest this please |
Test build #104656 has finished for PR 24129 at commit
|
Test build #104684 has finished for PR 24129 at commit
|
retest this please |
Test build #104691 has finished for PR 24129 at commit
|
retest this please |
Test build #104695 has finished for PR 24129 at commit
|
val allSupportsMicroBatch = streamingSources.forall(_.supports(MICRO_BATCH_READ)) | ||
// v1 streaming data source only supports micro-batch. | ||
val allSupportsContinuous = streamingSources.forall(_.supports(CONTINUOUS_READ)) && | ||
v1StreamingRelations.nonEmpty |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be isEmpty
because v1 sources don't support continuous mode. If there is a v1 source and nonEmpty
is true, then not all sources support continuous.
This would have been caught by a test. Can you add a few test cases for this like the V2WriteSupportCheckSuite
?
v1StreamingRelations.nonEmpty | ||
if (!allSupportsMicroBatch && !allSupportsContinuous) { | ||
throw new AnalysisException( | ||
"The streaming sources in a query do not have a common supported execution mode.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This error message isn't very specific. Is it possible to show the table names that support each mode so that the user has enough feedback to know what went wrong? That would be more helpful.
object V2StreamingScanSupportCheck extends (LogicalPlan => Unit) { | ||
import DataSourceV2Implicits._ | ||
|
||
override def apply(plan: LogicalPlan): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think these checks should also include a validation for individual tables. If a table doesn't support streaming at all, a more helpful error message is that a specific table doesn't support streaming, not just that there isn't a streaming mode that can handle all of the sources.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we add this check here, it will never be hit because we already checked it earlier in DataStreamReader
. Like I explained, it's non-trivial to move the check from DataStreamReader
to here, because it's coupled with the v2 -> v1 fallback logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think eventually we will have an analyzer rule that checks streaming scan capability and fallback to v1 if necessary. This checker rule is not suitable because it just traverses the plan, not returning a new plan. So we can't implement the fallback logic in this rule.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even if it will never be hit, it should still be in the analyzer so that plans that are created through other paths in the future are still checked. The idea is to avoid relying on a particular way of creating a logical plan to validate logical plans.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's why I opened https://issues.apache.org/jira/browse/SPARK-27483
plans that are created through other APIs(not DataStreamReader
) still need the fallback logic, which can not be done within this checker rule.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why would this check depend on fallback logic? These checks run after resolution rules have reached a fixed point. If there is a streaming DSv2 relation in the plan, fallback should already be done. Fallback logic is separate and this check can be done here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why would this check depend on fallback logic?
It's the opposite. The fallback logic depends on the check. That said, the future analyzer rule would do 3 things:
- find the v2 streaming relation in the plan, check scan capability
- if check failed, fallback to v1 relation, and use the v1 relation to replace the v2 streaming relation.
- if fallback is not applicable, fail
You can see that, it's not a simple check anymore, which can not be done within this simpler checker rule (LogicalPlan => Unit
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan, let's get the validation in now.
I don't think that the fallback rule should be implemented as you describe. I think it should be done in 2 parts: the rule to fallback and update the plan, and this validation that all sources support streaming.
Spark should not combine transform rules and validations. There are a couple of reasons for this principle:
- Validations are used to ensure that the query is valid and to ensure that rules are run correctly. If the transform rule is added to the analyzer in a single-run batch, we want validation to catch that. These checks catch errors in Spark, too.
- Rules should be as small as possible and focused on a single task. The fallback rule should not fail analysis if it doesn't know what to do because some other rule may be added later that does. For example, what if we build an adapter from continuous execution to micro-batch execution for a source?
So we will need a validation rule either way. When the fallback rule runs and can't fix the problem, this check should be what fails the plan.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added.
case r: StreamingRelation => r | ||
} | ||
|
||
if ((streamingSources ++ v1StreamingRelations).length > 1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: Is ++
a cheap operation? There's no need to create a new seq here when this could check the length of both individually.
Test build #104734 has finished for PR 24129 at commit
|
Test build #104733 has finished for PR 24129 at commit
|
Test build #104741 has finished for PR 24129 at commit
|
retest this please |
Test build #104745 has finished for PR 24129 at commit
|
I've addressed all the comments, @rdblue do you have any more comments? |
since @rdblue has no more comments , and @jose-torres has LGTMed, I'm merging to master, thanks! |
@cloud-fan, I was distracted this week by Spark Summit and writing talks. My position on this PR was quite clear:
I'd appreciate it if you would not commit changes with a clear -1 simply because you haven't heard from my in 3 days during a conference. |
+1 My remaining concerns were addressed, but I think it was inappropriate to merge this without waiting for a review. |
## What changes were proposed in this pull request? while working on apache#24129, I realized that I missed some document fixes in apache#24285. This PR covers all of them. ## How was this patch tested? N/A Author: Wenchen Fan <wenchen@databricks.com> Closes apache#24295 from cloud-fan/doc.
This is a followup of apache#24012 , to add the corresponding capabilities for streaming. existing tests Closes apache#24129 from cloud-fan/capability. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
This is a followup of #24012 , to add the corresponding capabilities for streaming.
How was this patch tested?
existing tests